{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# **HELK: Basic Sysmon ProcessCreate Graph Query**\n",
    "## Goals:\n",
    "* Confirm Jupyter can talk to Spark & Graphframes\n",
    "* Confirm Spark & Graphframes can pull data from ES\n",
    "* Confirm Spark Standalone Cluster Manager works (2 Workers)\n",
    "* Create a graphframe from sysmon Index\n",
    "  * Creating vertices and edges dataframes\n",
    "* Running a basic query using GraphFrames Motifs"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check the Spark Context via the variable spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/html": [
       "\n",
       "            <div>\n",
       "                <p><b>SparkSession - hive</b></p>\n",
       "                \n",
       "        <div>\n",
       "            <p><b>SparkContext</b></p>\n",
       "\n",
       "            <p><a href=\"http://729edcf1ff97:4040\">Spark UI</a></p>\n",
       "\n",
       "            <dl>\n",
       "              <dt>Version</dt>\n",
       "                <dd><code>v2.3.0</code></dd>\n",
       "              <dt>Master</dt>\n",
       "                <dd><code>spark://helk-spark-master:7077</code></dd>\n",
       "              <dt>AppName</dt>\n",
       "                <dd><code>PySparkShell</code></dd>\n",
       "            </dl>\n",
       "        </div>\n",
       "        \n",
       "            </div>\n",
       "        "
      ],
      "text/plain": [
       "<pyspark.sql.session.SparkSession at 0x7f14ccf98780>"
      ]
     },
     "execution_count": 1,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "spark"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Import Graphframes & SQL Functions"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from graphframes import *"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.functions import *"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Set a Custom Spark Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_graph = SparkSession \\\n",
    "    .builder \\\n",
    "    .appName(\"HELK\") \\\n",
    "    .config(\"es.read.field.as.array.include\", \"tags\") \\\n",
    "    .config(\"es.nodes\",\"helk-elasticsearch:9200\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Read data from the HELK (Elasticsearch-Sysmon Index)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = spark_graph.read.format(\"org.elasticsearch.spark.sql\").load(\"logs-endpoint-winevent-sysmon-*/doc\")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Print DataFrame Schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- @date_creation: timestamp (nullable = true)\n",
      " |-- @date_creation_previous: timestamp (nullable = true)\n",
      " |-- @timestamp: timestamp (nullable = true)\n",
      " |-- @version: string (nullable = true)\n",
      " |-- action: string (nullable = true)\n",
      " |-- beat: struct (nullable = true)\n",
      " |    |-- hostname: string (nullable = true)\n",
      " |    |-- name: string (nullable = true)\n",
      " |    |-- version: string (nullable = true)\n",
      " |-- device_name: string (nullable = true)\n",
      " |-- event_id: integer (nullable = true)\n",
      " |-- file_company: string (nullable = true)\n",
      " |-- file_description: string (nullable = true)\n",
      " |-- file_name: string (nullable = true)\n",
      " |-- file_product: string (nullable = true)\n",
      " |-- file_version: string (nullable = true)\n",
      " |-- geoip: struct (nullable = true)\n",
      " |    |-- city_name: string (nullable = true)\n",
      " |    |-- continent_code: string (nullable = true)\n",
      " |    |-- country_code2: string (nullable = true)\n",
      " |    |-- country_code3: string (nullable = true)\n",
      " |    |-- country_name: string (nullable = true)\n",
      " |    |-- dma_code: integer (nullable = true)\n",
      " |    |-- latitude: float (nullable = true)\n",
      " |    |-- location: struct (nullable = true)\n",
      " |    |    |-- lat: double (nullable = true)\n",
      " |    |    |-- lon: double (nullable = true)\n",
      " |    |-- longitude: float (nullable = true)\n",
      " |    |-- postal_code: string (nullable = true)\n",
      " |    |-- region_code: string (nullable = true)\n",
      " |    |-- region_name: string (nullable = true)\n",
      " |    |-- timezone: string (nullable = true)\n",
      " |-- hash_imphash: string (nullable = true)\n",
      " |-- hash_md5: string (nullable = true)\n",
      " |-- hash_sha1: string (nullable = true)\n",
      " |-- hash_sha256: string (nullable = true)\n",
      " |-- host_dst_name: string (nullable = true)\n",
      " |-- host_name: string (nullable = true)\n",
      " |-- host_src_name: string (nullable = true)\n",
      " |-- image_loaded: string (nullable = true)\n",
      " |-- image_signature: string (nullable = true)\n",
      " |-- image_signature_status: string (nullable = true)\n",
      " |-- image_signed: boolean (nullable = true)\n",
      " |-- ip_dst: string (nullable = true)\n",
      " |-- ip_dst_isipv6: string (nullable = true)\n",
      " |-- ip_src: string (nullable = true)\n",
      " |-- ip_src_isipv6: string (nullable = true)\n",
      " |-- level: string (nullable = true)\n",
      " |-- log_name: string (nullable = true)\n",
      " |-- network_initiated: boolean (nullable = true)\n",
      " |-- network_protocol: string (nullable = true)\n",
      " |-- opcode: string (nullable = true)\n",
      " |-- pipe_name: string (nullable = true)\n",
      " |-- port_dst_name: string (nullable = true)\n",
      " |-- port_dst_number: integer (nullable = true)\n",
      " |-- port_number: integer (nullable = true)\n",
      " |-- port_src_number: integer (nullable = true)\n",
      " |-- port_src_port_name: string (nullable = true)\n",
      " |-- process_calltrace: string (nullable = true)\n",
      " |-- process_command_line: string (nullable = true)\n",
      " |-- process_current_directory: string (nullable = true)\n",
      " |-- process_granted_access: string (nullable = true)\n",
      " |-- process_guid: string (nullable = true)\n",
      " |-- process_id: integer (nullable = true)\n",
      " |-- process_integrity_level: string (nullable = true)\n",
      " |-- process_name: string (nullable = true)\n",
      " |-- process_parent_command_line: string (nullable = true)\n",
      " |-- process_parent_guid: string (nullable = true)\n",
      " |-- process_parent_id: integer (nullable = true)\n",
      " |-- process_parent_name: string (nullable = true)\n",
      " |-- process_parent_path: string (nullable = true)\n",
      " |-- process_path: string (nullable = true)\n",
      " |-- process_target_guid: string (nullable = true)\n",
      " |-- process_target_id: integer (nullable = true)\n",
      " |-- process_target_name: string (nullable = true)\n",
      " |-- process_target_path: string (nullable = true)\n",
      " |-- process_thread_id: string (nullable = true)\n",
      " |-- provider_guid: string (nullable = true)\n",
      " |-- record_number: string (nullable = true)\n",
      " |-- registry_event_type: string (nullable = true)\n",
      " |-- registry_key_path: string (nullable = true)\n",
      " |-- registry_key_value: string (nullable = true)\n",
      " |-- service_state: string (nullable = true)\n",
      " |-- source_name: string (nullable = true)\n",
      " |-- sysmon_schema_version: string (nullable = true)\n",
      " |-- sysmon_version: string (nullable = true)\n",
      " |-- tags: array (nullable = true)\n",
      " |    |-- element: string (containsNull = true)\n",
      " |-- task: string (nullable = true)\n",
      " |-- thread_id: integer (nullable = true)\n",
      " |-- thread_new_id: integer (nullable = true)\n",
      " |-- thread_startaddress: string (nullable = true)\n",
      " |-- thread_startfunction: string (nullable = true)\n",
      " |-- thread_startmodule: string (nullable = true)\n",
      " |-- type: string (nullable = true)\n",
      " |-- user_domain: string (nullable = true)\n",
      " |-- user_logon_guid: string (nullable = true)\n",
      " |-- user_logon_id: long (nullable = true)\n",
      " |-- user_name: string (nullable = true)\n",
      " |-- user_reporter_domain: string (nullable = true)\n",
      " |-- user_reporter_name: string (nullable = true)\n",
      " |-- user_reporter_sid: string (nullable = true)\n",
      " |-- user_reporter_type: string (nullable = true)\n",
      " |-- user_terminal_session_id: integer (nullable = true)\n",
      " |-- version: integer (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create Vertices & Edges Dataframes"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "vertices = df.withColumn(\"id\", df.process_guid).select(\"id\",\"user_name\",\"host_name\",\"process_parent_name\",\"process_name\",\"action\")\n",
    "vertices = vertices.filter(vertices.action == \"processcreate\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------+---------+--------------+-------------------+-------------+-------------+\n",
      "|id                                  |user_name|host_name     |process_parent_name|process_name |action       |\n",
      "+------------------------------------+---------+--------------+-------------------+-------------+-------------+\n",
      "|A98268C1-B131-5AEA-0000-00100AAB6D01|wardog   |DESKTOP-WARDOG|svchost.exe        |taskhostw.exe|processcreate|\n",
      "|A98268C1-B18F-5AEA-0000-001077476F01|SYSTEM   |DESKTOP-WARDOG|svchost.exe        |wermgr.exe   |processcreate|\n",
      "|A98268C1-B1A0-5AEA-0000-001069F07001|SYSTEM   |DESKTOP-WARDOG|svchost.exe        |TiWorker.exe |processcreate|\n",
      "+------------------------------------+---------+--------------+-------------------+-------------+-------------+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "vertices.show(3,truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "edges = df.filter(df.action == \"processcreate\").selectExpr(\"process_parent_guid as src\",\"process_guid as dst\").withColumn(\"relationship\", lit(\"spawned\"))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+------------------------------------+------------------------------------+------------+\n",
      "|src                                 |dst                                 |relationship|\n",
      "+------------------------------------+------------------------------------+------------+\n",
      "|A98268C1-9E15-5AE6-0000-0010A2BD0000|A98268C1-B0A3-5AEA-0000-0010CA9C6B01|spawned     |\n",
      "|A98268C1-9E15-5AE6-0000-0010A2BD0000|A98268C1-B086-5AEA-0000-0010730F6901|spawned     |\n",
      "|A98268C1-9E15-5AE6-0000-0010A2BD0000|A98268C1-B0F3-5AEA-0000-001043326D01|spawned     |\n",
      "+------------------------------------+------------------------------------+------------+\n",
      "only showing top 3 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "edges.show(3,truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create a Graph (Vertices & Edges DataFrames)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "g = GraphFrame(vertices, edges)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Look for (Process A spawning Process B AND Process B Spawning Process C) "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "motifs = g.find(\"(a)-[]->(b);(b)-[]->(c)\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------------+------------+-------------------+-------------------+-------------------+----------------------+\n",
      "|process_parent_name    |process_name|process_parent_name|process_name       |process_parent_name|process_name          |\n",
      "+-----------------------+------------+-------------------+-------------------+-------------------+----------------------+\n",
      "|smss.exe               |wininit.exe |wininit.exe        |services.exe       |services.exe       |svchost.exe           |\n",
      "|wininit.exe            |services.exe|services.exe       |SearchIndexer.exe  |SearchIndexer.exe  |SearchProtocolHost.exe|\n",
      "|smss.exe               |wininit.exe |wininit.exe        |services.exe       |services.exe       |svchost.exe           |\n",
      "|smss.exe               |smss.exe    |smss.exe           |wininit.exe        |wininit.exe        |lsass.exe             |\n",
      "|ManagementAgentHost.exe|cmd.exe     |cmd.exe            |cmd.exe            |cmd.exe            |findstr.exe           |\n",
      "|null                   |smss.exe    |smss.exe           |smss.exe           |smss.exe           |wininit.exe           |\n",
      "|services.exe           |svchost.exe |svchost.exe        |CompatTelRunner.exe|CompatTelRunner.exe|CompatTelRunner.exe   |\n",
      "|wininit.exe            |services.exe|services.exe       |svchost.exe        |svchost.exe        |WMIADAP.exe           |\n",
      "|smss.exe               |wininit.exe |wininit.exe        |services.exe       |services.exe       |svchost.exe           |\n",
      "|services.exe           |svchost.exe |svchost.exe        |Defrag.exe         |Defrag.exe         |conhost.exe           |\n",
      "+-----------------------+------------+-------------------+-------------------+-------------------+----------------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "motifs.select(\"a.process_parent_name\",\"a.process_name\",\"b.process_parent_name\",\"b.process_name\",\"c.process_parent_name\",\"c.process_name\").show(10,truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.5.2"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
